什么是storageDecorator

在上次分析中,NewStorage()和NewREST()中生成核心storageInterface(即ETCD Helper)的是Decorator()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//***/registry/rbac/role/etcd/etcd.go NewREST()***//
storageInterface, dFunc := opts.Decorator(
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Roles),
&rbac.Role{},
prefix,
role.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
//***/pkg/registry/core/etcd/etcd.go NewStorage()***//
storageInterface, dFunc := opts.Decorator(
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Nodes),
&api.Node{},
prefix,
node.Strategy,
newListFunc,
node.NodeNameTriggerFunc)

所以storageDecorator是用来生成ETCD Helper的。我们先来看下/pkg/master/master.go中对storageDecorator的赋值:

1
2
3
4
5
6
//***赋值storageDecorator***//
if c.EnableWatchCache {
restOptionsFactory.storageDecorator = registry.StorageWithCacher
} else {
restOptionsFactory.storageDecorator = generic.UndecoratedStorage
}

所以,在EnableWatchCache为true的情况下,storageDecorator为reigstry.StorageWithCacher;否则为generic.UndecoratedStorage。而APIServer的–watch-cache参数默认为true,所以storageDecorator一般为registry.StorageWithCacher。为了从简单到复杂的分析,本次分析先介绍简单的generic.UndecoratedStorage,然后再介绍复杂的registry.StorageWithCacher。

generic.UndecoratedStorage

generic.UndecoratedStorage可以从字面上看出,可以直接返回storageInterface,定义在/pkg/registry/generic/storeage_decorator.go中:

1
2
3
4
5
6
7
8
9
10
11
// Returns given 'storageInterface' without any decoration.
func UndecoratedStorage(
config *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
return NewRawStorage(config)
}

NewStorage()函数同样定义在/pkg/registry/genericc/storage.go中:

1
2
3
4
5
6
7
8
9
10
11
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
//***调用/pkg/storage/storagebackend/factory/factory.go中的Create()***//
s, d, err := factory.Create(*config)
if err != nil {
glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s, d
}

NewRawStorage()直接调用factory.Create()来生成底层的key-value存储。这里的factory.Create()可以根据配置文件来生成ETCD2 Helper或ETCD3 Helper,这些,将在下一篇分析中详细介绍。

reigstry.StorageWithCacher

registry.StorageWithCacher也通过调用NewRawStorage()函数生成底层的key-value存储。不同的是registry.StorageWithCacher会对该key-value存储进行一定的封装,特别是在ListWatch场景中,使用了单reflector多watcher模型。

registry.StorageWithCacher定义在/pkg/registry/generic/registry/storage_factory.go中,主要流程如下:

  1. 调用NewRawStorage()生成ETCD Helper;
  2. 生成CacherConfig,其中CacherConfig中的Storage字段为刚生成好的ETCD Helper;
  3. NewCacherFromConfig()生成Cacher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Creates a cacher based given storageConfig.
func StorageWithCacher(
storageConfig *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
s, d := generic.NewRawStorage(storageConfig)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
if scopeStrategy.NamespaceScoped() {
cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(resourcePrefix, obj)
}
} else {
cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
return storage.NoNamespaceKeyFunc(resourcePrefix, obj)
}
}
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()
}
return cacher, destroyFunc
}

所以,接下来,让我们看下Cacher的定义。

Cacher

Cacher定义在/pkg/storage/cacher.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
type Cacher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM HighWaterMark
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
sync.RWMutex
// Before accessing the cacher's cache, wait for the ready to be ok.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// ready needs to be set to false when the cacher is paused or stopped.
// ready needs to be set to true when the cacher is ready to use after
// initialization.
ready *ready
// Underlying storage.Interface.
storage Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector
// Versioner is used to handle resource versions.
versioner Versioner
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
watchers indexedWatchers
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
stopCh chan struct{}
stopWg sync.WaitGroup
}

Cacher主要的字段如下:

  • incoming: incoming channel中的Event会自动分发到各watcher中;
  • storage: 底层的key-value存储,即ETCD Helper;
  • watchCache: 与reflector配合使用,处理reflector中的Event;
  • refstoragelector: 消费listerWatcher中的Event,并把Event并给watchCache处理;
  • watchers: watcher数组;
  • versioner: ETCD中数据版本管理结构体。

NewCacherFromConfig()

NewCacherFromConfig()的流程如下:

  1. 生成watchCache;
  2. 生成listerWatcher,listerWatcher对ETCD Helper进行了封装;
  3. 生成cacher;
  4. 设置watchCache中的OnEvent为cacher.processEvent;
  5. 启动cacher.dispatchEvents(),即分发方法;
  6. 返回cacher。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Create a new Cacher responsible from service WATCH and LIST requests from its
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(config.Type),
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: make(chan struct{}),
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
stopCh := cacher.stopCh
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
return cacher
}

所以,Cacher中最重要的方法是processEvent()和dispatchEvents()方法。

processEvent()

processEvent()的作用很简单,就是把watchCacheEvent放入Cacher的incoming channel。现在我们有了incoming channel中的生产者。

1
2
3
4
5
6
7
8
//***把event放入到incoming中***//
func (c *Cacher) processEvent(event watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- event
}

dispatchEvents()

dispatchEvents()会消费incoming中的watchCacheEvent,并调用dispatchEvent()处理watchCacheEvent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***消费incoming中的数据,并调用dispatchEvent()方法进行分发***//
func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}

dispatchEvent()

dispatchEvent()可以把watchCacheEvent添加到Cacher中的每个watcher中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
// TODO: For now we assume we have a given <timeout> budget for dispatching
// a single event. We should consider changing to the approach with:
// - budget has upper bound at <max_timeout>
// - we add <portion> to current timeout every second
timeout := time.Duration(250) * time.Millisecond
c.Lock()
defer c.Unlock()
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event, &timeout)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, &timeout)
}
}
} else {
// supported equal to false generally means that trigger function
// is not defined (or not aware of any indexes). In this case,
// watchers filters should generally also don't generate any
// trigger values, but can cause problems in case of some
// misconfiguration. Thus we paranoidly leave this branch.
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event, &timeout)
}
}
}
}

Watch()

Cacher的Watch()方法通过调用newCacheWatcher()生成一个cacheWatcher,然后把cacheWatcher加入到Cacher中,最后返回cacheWatcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
c.ready.wait()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
}
// If there is triggerFunc defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently triggerFunc is defined only for Pods and Nodes,
// and there is only constant number of watchers for which triggerSupported
// is false (excluding those issues explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.triggerFunc != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
}

cacheWatcher

再来看下cacheWatcher。cacheWatcher定义在/pkg/storage/cacher.go中:

1
2
3
4
5
6
7
8
9
type cacheWatcher struct {
sync.Mutex
input chan watchCacheEvent
result chan watch.Event
filter filterObjectFunc
done chan struct{}
stopped bool
forget func(bool)
}

newWatchCache()

newWatchCache()生成一个cacheWatcher(),然后启动process()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
//***生成cacheWatcher***//
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}

Process()

process()会消费input channel中的watchCacheEvent,并调用sendWatchCacheEvent()进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
c.sendWatchCacheEvent(&event)
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)
defer c.Stop()
for {
event, ok := <-c.input
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(&event)
}
}
}

sendWatchCacheEvent()

sendWatchCacheEvent()会把watchCacheEvent转换成普通Event,并把Event放入到result channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//***把event放入到result中***//
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObject)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
}
object, err := api.Scheme.Copy(event.Object)
if err != nil {
glog.Errorf("unexpected copy error: %v", err)
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: object}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: object}
case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
}
// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}
select {
case c.result <- watchEvent:
case <-c.done:
}
}

add()

add()可以把watchCacheEvent放到input channel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//***把event放入到input上***//
func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
// Try to send the event immediately, without blocking.
select {
case c.input <- *event:
return
default:
}
// OK, block sending, but only for up to <timeout>.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
t, ok := timerPool.Get().(*time.Timer)
if ok {
t.Reset(*timeout)
} else {
t = time.NewTimer(*timeout)
}
defer timerPool.Put(t)
select {
case c.input <- *event:
stopped := t.Stop()
if !stopped {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-t.C
}
case <-t.C:
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
c.forget(false)
c.stop()
}
if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
*timeout = 0
}
}

ResultChan()

所有watcher必须实现ResultChan()返回result channel。

1
2
3
4
// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}

watchCache

watchCache先把Event转换成cacheWatchEvent,然后对cacheWatchEvent进行处理。watchCache本身是一个store cache,可以和reflector配合使用。其中cacheWatchEvent比Event多了PrevObject,为了不增加复杂度,具体分析略。

watchCache的Add(), Update(), Delete()都会调用processEvent():

  1. 生成watchCacheEvent;
  2. 调用onEvent()
  3. 把watchCacheEvent进行缓存。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
    key, err := w.keyFunc(event.Object)
    if err != nil {
    return fmt.Errorf("couldn't compute key: %v", err)
    }
    elem := &storeElement{Key: key, Object: event.Object}
    // TODO: We should consider moving this lock below after the watchCacheEvent
    // is created. In such situation, the only problematic scenario is Replace(
    // happening after getting object from store and before acquiring a lock.
    // Maybe introduce another lock for this purpose.
    w.Lock()
    defer w.Unlock()
    previous, exists, err := w.store.Get(elem)
    if err != nil {
    return err
    }
    var prevObject runtime.Object
    if exists {
    prevObject = previous.(*storeElement).Object
    }
    watchCacheEvent := watchCacheEvent{
    Type: event.Type,
    Object: event.Object,
    PrevObject: prevObject,
    Key: key,
    ResourceVersion: resourceVersion,
    }
    if w.onEvent != nil {
    w.onEvent(watchCacheEvent)
    }
    w.updateCache(resourceVersion, &watchCacheEvent)
    w.resourceVersion = resourceVersion
    w.cond.Broadcast()
    return updateFunc(elem)
    }

这里的onEvent就是Cacher的processEvent()方法,可以把watchCacheEvent放入到Cacher的incoming channel中。

reflector

reflector可以消费ListWatcher中的Event,并把Event通过调用cache的Add(), Update()等操对Event进行缓存并处理。具体reflector在以后介绍。

总结

这次分析了两个ETCD Helper生成的入口函数:generic.UndecoratedStorage和registry.StorageWithCacher。

其中generic.UndecoratedStorage直接返回ETCD Helper。

而registry.StorageWithCacher中定义了Cacher,cacheWatcher,watchCache。Cacher先生成ETCD Helper,然后通过reflector机制把ETCD Helper的Event使用watchCache进行缓存并处理,处理的过程就是把Event放入到Cacher的incoming channel中。Cacher维护有一个dispatchEvents controller把incoming中的Event分发到各cacheWatcher,即放入到cacheWatcher中的input channel。cacheWatcher中有process controller把input中的Event放到result channel供外部消费。